package com.niit.TopOne.citytopone;


import com.niit.bean.CityTopOneBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CityTopOneDriver {
    public static void main(String[] args) throws Exception{
        //配置文件对象
        Configuration conf = new Configuration();
        // 创建作业实例
        Job job = Job.getInstance(conf, CityTopOneDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(CityTopOneDriver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(CityTopOneMapper.class);
        job.setReducerClass(CityTopOneReducer.class);

        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(CityTopOneBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(CityTopOneBean.class);
        job.setOutputValueClass(NullWritable.class);
        //todo 设置自定义分组
        job.setGroupingComparatorClass(CityGroupingComparator.class);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, new Path("input/covid/快手app大学生用户.csv"));
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, new Path("output/topOne/citytopone"));
        //判断输出路径是否存在 如果存在删除
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(new Path("output/topOne/citytopone"))){
            fs.delete(new Path("output/topOne/citytopone"),true);
        }
        // 提交作业并等待执行完成
        boolean resultFlag = job.waitForCompletion(true);
        //程序退出
        System.exit(resultFlag ? 0 :1);
    }
}
